GCSToBigQueryOperatorでGCSからBigQueryにJSONの必要なフィールドのみロードしたい
Apache AirflowでGCSにあるファイルをBigQueryにロードさせるには、GCSToBigQueryOperatorを使ってワークフローを構築していくことができます。
JSONファイルをロードする際、全てではなく必要なフィールドのみをロードしてみたかったので、方法を残しておこうと思います。
AIrflowのバージョンは、
$ airflow version 2.6.3
です。
jsonファイルはGoogle Cloudのドキュメントに載っているものを利用します。
{"id":"1","first_name":"John","last_name":"Doe","dob":"1968-01-22","addresses":[{"status":"current","address":"123 First Avenue","city":"Seattle","state":"WA","zip":"11111","numberOfYears":"1"},{"status":"previous","address":"456 Main Street","city":"Portland","state":"OR","zip":"22222","numberOfYears":"5"}]} {"id":"2","first_name":"Jane","last_name":"Doe","dob":"1980-10-16","addresses":[{"status":"current","address":"789 Any Avenue","city":"New York","state":"NY","zip":"33333","numberOfYears":"2"},{"status":"previous","address":"321 Main Street","city":"Hoboken","state":"NJ","zip":"44444","numberOfYears":"3"}]}
ここで欲しいものは、
id
, first_name
, last_name
, dob
とします。
schema_fieldsで定義
スキーマのリストをでフィールドを指定します。
※ schema_object'がNULLでautodetectがFalseの場合、パラメータを定義する必要がある
ドキュメントを参考にしながら、DAGのタスク部分を以下のようなコードにしてみました。
GCSToBigQueryOperator( gcp_conn_id=CONNECTION_ID, task_id="load", bucket='airflow-events', source_objects=["sample/year=2023/month=08/day=01/*.json"], destination_project_dataset_table=f'{DATASET_NAME}.users', source_format='NEWLINE_DELIMITED_JSON', write_disposition='WRITE_APPEND', create_disposition='CREATE_IF_NEEDED', autodetect=False, ignore_unknown_values=True, schema_update_options=['ALLOW_FIELD_ADDITION'], schema_fields=[ {"name": "id", "type": "INTEGER"}, {"name": "first_name", "type": "STRING"}, {"name": "last_name", "type": "STRING"}, {"name": "dob", "type": "DATE"}, ], )
このオペレーターを実装したDAGを実行すると、BigQuery側にデータがロードされていました。
schema_objectで定義
テーブルのスキーマを含む .json ファイルを指すGCSオブジェクト・パスを指定。
※ 'schema_fields' が NULL で autodetect が False の場合に定義する必要があります。
今回の場合だと、以下のような形式でスキーマの情報を定義し、GCSにjsonファイルとして保存しておく必要があります。
[ { "name": "id", "type": "INTEGER", "mode": "NULLABLE" }, { "name": "first_name", "type": "STRING", "mode": "NULLABLE" }, { "name": "last_name", "type": "STRING", "mode": "NULLABLE" }, { "name": "dob", "type": "DATE", "mode": "NULLABLE" } ]
DAGのタスク部分を以下のようなコードにしてみました。
保存したJSONが、例えば
gs://airflow-sample-events/myschema.json
だったとしたら、schema_object
は パスの部分 (myschema.json
) となります。
GCSToBigQueryOperator( gcp_conn_id=CONNECTION_ID, task_id="load", bucket='airflow-events', source_objects=["sample/year=2023/month=08/day=01/*.json"], destination_project_dataset_table=f'{DATASET_NAME}.users', source_format='NEWLINE_DELIMITED_JSON', write_disposition='WRITE_APPEND', create_disposition='CREATE_IF_NEEDED', autodetect=False, ignore_unknown_values=True, schema_update_options=['ALLOW_FIELD_ADDITION'], schema_object="myschema.json", )
このオペレーターを実装したDAGを実行すると、schema_objectと同様にBigQuery側にデータがロードされていました。